package com.pms.api;

import com.pms.exception.R;
import com.pms.service.IWaterPumpGroupAttrService;
import com.pms.util.DateUtil;
import com.pms.utils.KafkaUtil;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.Map;

/**
 * Created by Administrator on 2018/1/9.
 */
@RestController
@Api(value = "测试kafta接口", description = "测试kafta接口")
@RequestMapping(value = "testKafka", method = RequestMethod.POST)
public class KafkaTestApi {
    @Autowired
    IWaterPumpGroupAttrService waterPumpAttrService;
    private KafkaUtil kafkaUtil = new KafkaUtil();

    @ApiOperation(value = "订阅kafka消息")
    @RequestMapping(value = "/subscribeKafka", method = RequestMethod.POST)
    @ApiImplicitParams({
            @ApiImplicitParam(name = "topic", value = "订阅主题", required = false, dataType = "string", paramType = "form"),
    })
    public R subscribeKafka(String topic) {
        if (StringUtils.isBlank(topic)) {
            return R.error(400, "主题不能为空");
        }
        kafkaUtil.send(topic, "key1", "value1");
        ConsumerRecords<String, String> records = kafkaUtil.subscribe(topic);
        Map<String, Object> map = new HashMap<String, Object>();
        for (ConsumerRecord<String, String> record : records) {
            String position = record.partition() + "-" + record.offset();
            System.out.println("接受时间=" + DateUtil.longToString(System.currentTimeMillis(), "yyyy-MM-dd HH:mm:ss:SSS"));
            System.out.println(position + ": " + record.key() + " " + record.value());
            map.put(position + ": " + record.key(), record.value());
        }
        return R.ok().put("data", map);
    }
}
